# uncompyle6 version 3.2.3
# Python bytecode 3.6 (3379)
# Decompiled from: Python 3.6.8 |Anaconda custom (64-bit)| (default, Feb 21 2019, 18:30:04) [MSC v.1916 64 bit (AMD64)]
# Embedded file name: site-packages\sqlalchemy\engine\url.py
"""Provides the :class:`~sqlalchemy.engine.url.URL` class which encapsulates
information about a database connection specification.

The URL object is created automatically when
:func:`~sqlalchemy.engine.create_engine` is called with a string
argument; alternatively, the URL is a public-facing construct which can
be used directly and is also accepted directly by ``create_engine()``.
"""
import re
from .. import exc, util
from . import Dialect
from ..dialects import registry, plugins


class URL(object):
    """
    Represent the components of a URL used to connect to a database.
    
    This object is suitable to be passed directly to a
    :func:`~sqlalchemy.create_engine` call.  The fields of the URL are parsed
    from a string by the :func:`.make_url` function.  the string
    format of the URL is an RFC-1738-style string.
    
    All initialization parameters are available as public attributes.
    
    :param drivername: the name of the database backend.
      This name will correspond to a module in sqlalchemy/databases
      or a third party plug-in.
    
    :param username: The user name.
    
    :param password: database password.
    
    :param host: The name of the host.
    
    :param port: The port number.
    
    :param database: The database name.
    
    :param query: A dictionary of options to be passed to the
      dialect and/or the DBAPI upon connect.
    
    """

    def __init__(
        self,
        drivername,
        username=None,
        password=None,
        host=None,
        port=None,
        database=None,
        query=None,
    ):
        self.drivername = drivername
        self.username = username
        self.password_original = password
        self.host = host
        if port is not None:
            self.port = int(port)
        else:
            self.port = None
        self.database = database
        self.query = query or {}

    def __to_string__(self, hide_password=True):
        s = self.drivername + "://"
        if self.username is not None:
            s += _rfc_1738_quote(self.username)
            if self.password is not None:
                s += ":" + ("***" if hide_password else _rfc_1738_quote(self.password))
            s += "@"
        if self.host is not None:
            if ":" in self.host:
                s += "[%s]" % self.host
            else:
                s += self.host
            if self.port is not None:
                s += ":" + str(self.port)
            if self.database is not None:
                s += "/" + self.database
            if self.query:
                keys = list(self.query)
                keys.sort()
                s += "?" + ("&").join(("%s=%s" % (k, self.query[k]) for k in keys))
            return s

    def __str__(self):
        return self.__to_string__(hide_password=False)

    def __repr__(self):
        return self.__to_string__()

    def __hash__(self):
        return hash(str(self))

    def __eq__(self, other):
        return (
            isinstance(other, URL)
            and self.drivername == other.drivername
            and self.username == other.username
            and self.password == other.password
            and self.host == other.host
            and self.database == other.database
            and self.query == other.query
        )

    @property
    def password(self):
        if self.password_original is None:
            return
        else:
            return util.text_type(self.password_original)

    @password.setter
    def password(self, password):
        self.password_original = password

    def get_backend_name(self):
        if "+" not in self.drivername:
            return self.drivername
        else:
            return self.drivername.split("+")[0]

    def get_driver_name(self):
        if "+" not in self.drivername:
            return self.get_dialect().driver
        else:
            return self.drivername.split("+")[1]

    def _instantiate_plugins(self, kwargs):
        plugin_names = util.to_list(self.query.get("plugin", ()))
        return [plugins.load(plugin_name)(self, kwargs) for plugin_name in plugin_names]

    def _get_entrypoint(self):
        """Return the "entry point" dialect class.
        
        This is normally the dialect itself except in the case when the
        returned class implements the get_dialect_cls() method.
        
        """
        if "+" not in self.drivername:
            name = self.drivername
        else:
            name = self.drivername.replace("+", ".")
        cls = registry.load(name)
        if hasattr(cls, "dialect"):
            if isinstance(cls.dialect, type):
                if issubclass(cls.dialect, Dialect):
                    return cls.dialect
                return cls

    def get_dialect(self):
        """Return the SQLAlchemy database dialect class corresponding
        to this URL's driver name.
        """
        entrypoint = self._get_entrypoint()
        dialect_cls = entrypoint.get_dialect_cls(self)
        return dialect_cls

    def translate_connect_args(self, names=[], **kw):
        r"""Translate url attributes into a dictionary of connection arguments.
        
        Returns attributes of this url (`host`, `database`, `username`,
        `password`, `port`) as a plain dictionary.  The attribute names are
        used as the keys by default.  Unset or false attributes are omitted
        from the final dictionary.
        
        :param \**kw: Optional, alternate key names for url attributes.
        
        :param names: Deprecated.  Same purpose as the keyword-based alternate
            names, but correlates the name to the original positionally.
        """
        translated = {}
        attribute_names = ["host", "database", "username", "password", "port"]
        for sname in attribute_names:
            if names:
                name = names.pop(0)
            else:
                if sname in kw:
                    name = kw[sname]
                else:
                    name = sname
            if name is not None:
                translated[name] = getattr(self, sname, False) and getattr(self, sname)

        return translated


def make_url(name_or_url):
    """Given a string or unicode instance, produce a new URL instance.
    
    The given string is parsed according to the RFC 1738 spec.  If an
    existing URL object is passed, just returns the object.
    """
    if isinstance(name_or_url, util.string_types):
        return _parse_rfc1738_args(name_or_url)
    else:
        return name_or_url


def _parse_rfc1738_args(name):
    pattern = re.compile(
        "\n            (?P<name>[\\w\\+]+)://\n            (?:\n                (?P<username>[^:/]*)\n                (?::(?P<password>.*))?\n            @)?\n            (?:\n                (?:\n                    \\[(?P<ipv6host>[^/]+)\\] |\n                    (?P<ipv4host>[^/:]+)\n                )?\n                (?::(?P<port>[^/]*))?\n            )?\n            (?:/(?P<database>.*))?\n            ",
        re.X,
    )
    m = pattern.match(name)
    if m is not None:
        components = m.groupdict()
        if components["database"] is not None:
            tokens = components["database"].split("?", 2)
            components["database"] = tokens[0]
            query = len(tokens) > 1 and dict(util.parse_qsl(tokens[1])) or None
            if util.py2k:
                if query is not None:
                    query = {k.encode("ascii"): query[k] for k in query}
                else:
                    query = None
                components["query"] = query
                if components["username"] is not None:
                    components["username"] = _rfc_1738_unquote(components["username"])
                if components["password"] is not None:
                    components["password"] = _rfc_1738_unquote(components["password"])
                ipv4host = components.pop("ipv4host")
                ipv6host = components.pop("ipv6host")
                components["host"] = ipv4host or ipv6host
                name = components.pop("name")
                return URL(name, **components)
        raise exc.ArgumentError("Could not parse rfc1738 URL from string '%s'" % name)


def _rfc_1738_quote(text):
    return re.sub("[:@/]", lambda m: "%%%X" % ord(m.group(0)), text)


def _rfc_1738_unquote(text):
    return util.unquote(text)


def _parse_keyvalue_args(name):
    m = re.match("(\\w+)://(.*)", name)
    if m is not None:
        name, args = m.group(1, 2)
        opts = dict(util.parse_qsl(args))
        return URL(name, *opts)
    else:
        return
